import sys

sys.path.append('..')
from pyspark.sql import Window
from pyspark.sql.functions import *

from bigdata.Personas import Hive_process

if __name__ == '__main__':
    #用户商品偏好
    hive = Hive_process.Hive_process()
    table_name = 'shopping.tbl_goods'
    goods_df = hive.read(table_name)
    orders_df = hive.read('shopping.tbl_orders')
    #两表联合
    goods_orders_df = goods_df.join(orders_df,orders_df['ordersn'] == goods_df['cordersn'])
    #得到么个用户购买每种类型产品的次数
    preference_df = goods_orders_df\
    .groupby('memberId','productType')\
    .count()
    #将次数排序得到偏好
    window = Window.partitionBy('memberId').orderBy(preference_df['count'].desc())
    preference_df = preference_df.select('memberId','productType','count',dense_rank().over(window).alias('rank'))
